package com.atguigu.gmall.realtime.app.func;

import com.alibaba.fastjson.JSONObject;
import com.atguigu.gmall.realtime.utils.DimUtil;
import com.atguigu.gmall.realtime.utils.ThreadPoolUtil;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.async.ResultFuture;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;

import java.util.Collections;
import java.util.concurrent.ExecutorService;


//采用模板方法设计模式
    //在父类中完成功能的骨架，在每个子类中都可以有不同的实现,RichAsyncFunction是抽象类，方法，需要具体我们去实现
public abstract class DimAsyncFunction<T> extends RichAsyncFunction<T,T> implements DimJoinFunction<T> {

    //定义线程池连接对象
    private ExecutorService executorService;
    //定义表名属性，使得其他的类调用构造方法时,可以传进表名，获取查询的表名,然后再设法去获取key，也就是id
    private String tableName;

    //类的构造器
    public DimAsyncFunction(String tableName) {
        this.tableName = tableName;
    }


    //需要在声明周期中创建一次，线程池连接对象即可,所以富函数的open方法

    @Override
    public void open(Configuration parameters) throws Exception {
        System.out.println("----获取线程池连接对象----");
        //创建连接对象
         executorService = ThreadPoolUtil.getInstance();
    }

    //异步方法,T obj代表流中的一条数据
    //重写asyncInvoke方法，在该方法中通过线程池获取线程，发送异步请求
    @Override
    public void asyncInvoke(T obj, ResultFuture<T> resultFuture) throws Exception {
        //当流中的数据流过来之后，会从线程池中获取一个线程，并且执行run方法
        executorService.submit(
                new Runnable() {
                    @Override
                    public void run() {
                        try {
                            //线程开始时间
                            long start = System.currentTimeMillis();
                            //获取想要关联的维度key，这个方法，会在后面被调用的类重写
                            String key = getKey(obj);
                            //获取维度数据,通过之前的工具类，可以查询到一个结果,为Json类形式
                            JSONObject dimInfoCacheJsonObj = DimUtil.getDimInfoCache(tableName, key);
                            //尾端关联，进行join,可以先判断得到是是否为空
                            if(dimInfoCacheJsonObj !=null){
                                //通过join也是在后面子类中进行实现,一个是传进来的类，此是宽表类，后面是查询到的json维度信息
                                join(obj,dimInfoCacheJsonObj);
                            }
                            //线程结束时间
                            long end = System.currentTimeMillis();
                            System.out.println("异步查询维度"+tableName+"耗时间为:"+(end-start)+"毫秒");
                            //接受异步请求的结果
                            resultFuture.complete(Collections.singleton(obj));
                        } catch (Exception e) {
                            e.printStackTrace();
                            throw  new RuntimeException("异步查询维度失败");
                        }
                    }
                }
        );

    }
}
